import java.util.concurrent.Flow;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.SubmissionPublisher;

public class MyProcessor extends SubmissionPublisher<String> implements Flow.Processor<Integer,String> {

    private Subscription subscription;

    @Override
    public void onSubscribe(Subscription subscription) {
        //保存订阅关系，需要用它来给发布者响应
        this.subscription = subscription;

        //请求一个数据
        this.subscription.request(1);
    }

    @Override
    public void onNext(Integer item) {
        //接受到一个数据，处理
        System.out.println("处理器接收到的数据（1）："+ item);

        //过滤掉小于0的，然后发布出去
        if(item>0){
            this.submit("转换后的数据（2）："+ item);
        }

        //处理完调用request再请求一个数据
        this.subscription.request(1);
    }

    @Override
    public void onError(Throwable throwable) {
        //出现异常
        throwable.printStackTrace();
        //告诉发布者，后面不接受数据
        this.subscription.cancel();
    }

    @Override
    public void onComplete() {

        //全部数据处理完了(发布者关闭了)
        System.out.println("处理器处理完了");
        //关闭发布者
        this.close();
    }
}
